Analysis of Reddit Comments

I pulled the Dataset from Reddit's Archive Site, which contains a "Complete Public Reddit Comments Corpus".
I'll attempt to bring the dataset into my environment, perform an ETL on the dataset, and run LDA on it to determine the topics of them.

Read Reddit Parquet Data For Analysis

We've pulled the dataset into an S3 bucket to allow Spark to process it further.

  • Input: json + bzip2 compression (50,687,364,160 = 50.5GB)
  • Output: Parquet + gzip (52,395,455,189 = 52.3GB)

In [3]:
spark.read.parquet("/mnt/mwc/reddit_year_p/year=2012/").count()

In [4]:
# List partitions and register as temp tables
import pyspark.sql.functions as F
from pyspark.sql.types import *
# To create the complete dataset, let's create temporary tables per year then find create a master union table
df = sc.parallelize(dbutils.fs.ls("/mnt/mwc/reddit_year_p")).toDF()

# Parse the year partition to get an array of years to register the tables by
years = df.select(F.regexp_extract('name', '(\d+)', 1).alias('year')).collect()
year_partitions = [x.asDict().values()[0] for x in years if x.asDict().values()[0]]
year_partitions

# Loop over and register a table per year 
for y in year_partitions:
  df = sqlContext.read.parquet("/mnt/mwc/reddit_year_p/year=%s" % y)
  df.createOrReplaceTempView("reddit_%s" % y)

# Register the root directory for the complete dataset 
df_complete = spark.read.parquet("/mnt/mwc/reddit_year_p/")
# df_complete.createGlobalTempView("reddit_all")
df_complete.createOrReplaceTempView("reddit_all")

In [5]:
df_complete.rdd.getNumPartitions()

In [6]:
spark.sql("select * from reddit_all where year = 2014").explain(True)

In [7]:
%sql
select * from reddit_all where year = $arg

In [8]:
df_complete.printSchema()

In [9]:
display(df_complete)

A few ideas of what can be accomplished with this dataset

  • Identify and track topics associated with every subreddit and username
  • Model flow of conversations (e.g. rate of replies compared to controversiality of comment/post)
  • Predict posts/subreddits a user will next engage with (i.e. recommender systems)

In [11]:
%sh
ls -lh /dbfs/mnt/mwc/reddit_year_p/year=2014/

In [12]:
%sql
-- run 1
select count(1) as count, year from reddit_all group by year order by year asc ;

In [13]:
%sql
-- run 2 
select count(1) as count, year from reddit_all group by year order by year asc ;

Total Number of Comments posted per day of week in 2012


In [15]:
%sql
--- Find the number of comments per day of week for 2012
SELECT day, sum(comments) as counts from (
  SELECT date_format(from_unixtime(created_utc), 'EEEE') day, COUNT(*) comments
  FROM reddit_2014
  GROUP BY created_utc
  ORDER BY created_utc
) q2
GROUP BY day 
ORDER BY counts;

In [16]:
%sql
--- Find the number of comments per day of week for 2012
SELECT day, sum(comments) as counts from (
  SELECT date_format(from_unixtime(created_utc), 'EEEE') day, COUNT(*) comments
  FROM reddit_2014
  GROUP BY created_utc
  ORDER BY created_utc
) q2
GROUP BY day 
ORDER BY counts;

Best Time to Comment on Posts in 2014

To view the SQL language manaual in Databricks Docs


In [18]:
%sql
-- Select best time to comment on posts 
CREATE TABLE IF NOT EXISTS popular_posts_2014
  USING parquet 
  OPTIONS (
    path "/mnt/mwc/popular_posts_2014"
  ) 
AS SELECT 
  day,
  hour,
  SUM(IF(score >= 1000, 1, 0)) as score_gt_1k
FROM
  (SELECT 
    date_format(from_utc_timestamp(from_unixtime(created_utc), "PST"), 'EEEE') as day, 
    date_format(from_utc_timestamp(from_unixtime(created_utc), "PST"), 'h a') as hour,
    score
  FROM reddit_2014) q1
GROUP BY day, hour
ORDER BY day, hour

In [19]:
current_table = 'popular_posts_2014'
df = spark.read.parquet("/mnt/mwc/popular_posts_2014")
df.createOrReplaceTempView('popular_posts_2014')
display(table(current_table))

Matplotlib Visualization


In [21]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [22]:
# Define the labels sorted in my predefined order
column_labels = ["12 AM", "1 AM", "2 AM", "3 AM", "4 AM", "5 AM", "6 AM", "7 AM", "8 AM", "9 AM", "10 AM", "11 AM", "12 PM", "1 PM", "2 PM", "3 PM", "4 PM", "5 PM", "6 PM", "7 PM", "8 PM", "9 PM", "10 PM", "11 PM"]

# Zip up the 2 column names by predefined order
column2_name = ['Count of Comments > 1K Votes']*len(column_labels)
column_label_sorted = zip(column2_name, column_labels)

# Define the row labels to map the calendar week
row_labels = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]

data = [[x.day, x.hour, x.score_gt_1k] for x in table(current_table).collect()]

In [23]:
# Create the Pivot Table
colNames = ['Day of Week', 'Hour', 'Count of Comments > 1K Votes']
data_m = pd.DataFrame(data,columns = colNames)
pvt = pd.pivot_table(data_m, index=['Day of Week'], columns=['Hour'])

In [24]:
# Call reindex_axis to sort the row axis by my order array
# Call reindex_axis on axis=1 (columns) to sort columns by my ordered zipped array
pvt_sorted = pvt.reindex_axis(row_labels, axis=0).reindex_axis(column_label_sorted, axis=1)
pvt_sorted

In [25]:
data_p = pvt_sorted.as_matrix().transpose()
fig, ax = plt.subplots()
heatmap = ax.pcolor(data_p, cmap=plt.cm.Blues)

# put the major ticks at the middle of each cell
ax.set_xticks(np.arange(data_p.shape[1])+0.5, minor=False)
ax.set_yticks(np.arange(data_p.shape[0])+0.5, minor=False)

# want a more natural, table-like display
ax.invert_yaxis()
ax.xaxis.tick_top()

ax.set_xticklabels(row_labels, minor=False)
ax.set_yticklabels(column_labels, minor=False)
display()

R Visualizations


In [27]:
%r
# Install necessary packages to use ggplot2 
install.packages("ggplot2")
install.packages("reshape")
library(plyr)
library(reshape2)
library(scales)
library(ggplot2)

In [28]:
%r
scores <- sql("FROM popular_posts_2014 SELECT *")
local_df <- collect(scores)

# We can pivot the data in 2 ways, option 1 commented out
# local_df$day <- factor(local_df$day)
# xtabs(score_gt_2k ~ hour+day, local_df)

heat_val <- with(local_df, tapply(score_gt_1k, list(hour, day) , I)  )
# Define logical times
times <- c("12 AM", "1 AM", "2 AM", "3 AM", "4 AM", "5 AM", "6 AM", "7 AM", "8 AM", "9 AM", "10 AM", "11 AM", "12 PM", "1 PM", "2 PM", "3 PM", "4 PM", "5 PM", "6 PM", "7 PM", "8 PM", "9 PM", "10 PM", "11 PM")
heat_val[times, ]

In [29]:
%r
# Testing out the factor api, which doesn't do much until you use ggplot 
local_df.m <- melt(local_df)
local_df.m$hour <- factor(local_df.m$hour, levels=times)
local_df.m

In [30]:
%r
library(scales)
# Melt flattens the R.DataFrame into a friendly format for ggplot
local_df.m <- melt(local_df)
# factor() allows you to specify the exact ordering for the values within a column! This is extremely important since these values have no machine readable sort order. 
local_df.m$hour <- factor(local_df.m$hour, levels=rev(times))
local_df.m$day <- factor(local_df.m$day, levels=c("Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"))

# This provides the heatmap of the comment posts
p <- ggplot(local_df.m, aes(day, hour)) + geom_tile(aes(fill = value), colour = "white") + scale_fill_gradient(low = "white", high = "steelblue")
p

Look at the Average Length of Comments vs Comment Score


In [32]:
df = table("reddit_2010").unionAll(table("reddit_2011")).unionAll(table("reddit_2012"))
df.registerTempTable("reddit_201x")

In [33]:
dfc = sqlContext.sql("""SELECT
  score,
  AVG(LENGTH(body)) as avg_comment_length,
  STDDEV(LENGTH(body))/SQRT(COUNT(score)) as se_comment_length,
  COUNT(score) as num_comments
 FROM reddit_201x
 GROUP BY score
 ORDER BY score""") 

df = dfc.filter("score >= -200 and score <=2000").select("score", "avg_comment_length", "se_comment_length").toPandas()

In [34]:
from ggplot import *

p = ggplot(df, aes(x='score', y='avg_comment_length')) + \
    geom_line(size=0.25, color='red') + \
    ylim(0, 1100) + \
    xlim(-200, 2000) + \
    xlab("(# Upvotes - # Downvotes)") + \
    ylab("Avg. Length of Comment For Each Score") + \
    ggtitle("Relationship between Reddit Comment Score and Comment Length for Comments")
    
display(p)

Find the Most Active Community in SubReddits


In [36]:
%sql
SELECT subreddit, num_comments 
  FROM (
    SELECT count(*) as num_comments, 
          subreddit 
    FROM reddit_2014 
    GROUP BY subreddit
    ORDER BY num_comments DESC
    LIMIT 20
  ) t1

In [37]: